//nolint
/*
Copyright (c) 2013-2016 Errplane Inc.
This code is originally from: https://github.com/influxdata/influxdb/blob/1.7/services/meta/client.go

2022.01.23 change http to rpc
Add TagKeys,FieldKeys etc.
Copyright 2022 Huawei Cloud Computing Technologies Co., Ltd.
*/

package metaclient

import (
	"fmt"
	"sync"
	"time"

	set "github.com/deckarep/golang-set/v2"
	"github.com/influxdata/influxdb/models"
	originql "github.com/influxdata/influxql"
	"github.com/openGemini/openGemini/app/ts-meta/meta/message"
	"github.com/openGemini/openGemini/engine/op"
	"github.com/openGemini/openGemini/lib/config"
	"github.com/openGemini/openGemini/lib/obs"
	"github.com/openGemini/openGemini/lib/spdy"
	"github.com/openGemini/openGemini/lib/spdy/transport"
	"github.com/openGemini/openGemini/lib/util/lifted/influx/influxql"
	meta2 "github.com/openGemini/openGemini/lib/util/lifted/influx/meta"
	proto2 "github.com/openGemini/openGemini/lib/util/lifted/influx/meta/proto"
	"github.com/openGemini/openGemini/lib/util/lifted/influx/query"
	"github.com/openGemini/openGemini/lib/util/lifted/protobuf/proto"
)

const (
	// SaltBytes is the number of bytes used for salts.
	SaltBytes = 32

	pbkdf2Iter4096 = 4096
	pbkdf2Iter1000 = 1000

	// the length of key generated by pbkdf2
	pbkdf2KeyLen = 32

	// hash algorithm version flag of password save
	hashAlgoVerOne = "#Ver:001#"
	// PBKDF2 iterator 4096 times
	hashAlgoVerTwo = "#Ver:002#"
	// PBKDF2 iterator 1000 times
	hashAlgoVerThree = "#Ver:003#"

	maxDbOrRpName = 256

	RetentionDelayedTime = 24 * time.Hour // for logkeeper service

	RPCReqTimeout = 10 * time.Second

	//for lock user
	maxLoginLimit      = 5    //Maximum number of login attempts
	authFailCacheLimit = 200  //Size of the channel for processing authentication failures.
	lockUserTime       = 30   //User Lock Duration, in seconds.
	maxLoginValidTime  = 3600 //Validity duration of login records, in seconds.

	minUsernameLen = 4   // Minimum username length
	maxUsernameLen = 100 // Maximum username length

	minPasswordLen = 8   // Minimum password length
	maxPasswordLen = 256 // Maximum password length
)

type Role int

const (
	SQL Role = iota
	STORE
	META

	algoVer01 = 1
	algoVer02 = 2
	algoVer03 = 3
)

var (
	errSleep           = time.Second
	ErrNameTooLong     = fmt.Errorf("database name must have fewer than %d characters", maxDbOrRpName)
	RetryExecTimeout   = 60 * time.Second
	RetryReportTimeout = 60 * time.Second
	HttpReqTimeout     = 10 * time.Second
)

var DefaultTypeMapper = influxql.MultiTypeMapper(
	op.TypeMapper{},
	query.MathTypeMapper{},
	query.FunctionTypeMapper{},
	query.StringFunctionTypeMapper{},
	query.LabelFunctionTypeMapper{},
	query.PromTimeFunctionTypeMapper{},
)

var DefaultMetaClient *Client
var cliOnce sync.Once

type StorageNodeInfo struct {
	InsertAddr  string
	SelectAddr  string
	Az          string
	RetryTime   time.Duration
	RetryNumber int
}

type SqlNodeInfo struct {
	HttpAddr    string
	GossipAddr  string
	RetryTime   time.Duration
	RetryNumber int
}

type FieldKey struct {
	Field     string
	FieldType int32
}

type FieldKeys []FieldKey

func (a FieldKeys) Len() int           { return len(a) }
func (a FieldKeys) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a FieldKeys) Less(i, j int) bool { return a[i].Field < a[j].Field }

// MetaClient is an interface for accessing meta data.
type MetaClient interface {
	MetadataManager
	DatabaseManager
	NodeManager
	ShardManager
	UserManager
	SystemManager
	SubscriptionManager
	ContinuousQueryManager
	DownSampleManager
	RepManager
	MeasurementManager
	StreamManager
	OpenAtStore() error
	RetryRegisterQueryIDOffset(host string) (uint64, error)
}

type MeasurementManager interface {
	CreateMeasurement(database, retentionPolicy, mst string, shardKey *meta2.ShardKeyInfo, numOfShards int32, indexR *influxql.IndexRelation, engineType config.EngineType,
		colStoreInfo *meta2.ColStoreInfo, schemaInfo []*proto2.FieldSchema, options *meta2.Options) (*meta2.MeasurementInfo, error)
	AlterShardKey(database, retentionPolicy, mst string, shardKey *meta2.ShardKeyInfo) error
	MarkMeasurementDelete(database, policy, measurement string) error
	GetMeasurementID(database string, rpName string, mstName string) (uint64, error)
	QueryTagKeys(database string, ms influxql.Measurements, cond influxql.Expr) (map[string]map[string]struct{}, error)
	MatchMeasurements(database string, ms influxql.Measurements) (map[string]*meta2.MeasurementInfo, error)
	Measurements(database string, ms influxql.Measurements) ([]string, error)
}

type RepManager interface {
	ThermalShards(db string, start, end time.Duration) map[uint64]struct{}
	DBRepGroups(database string) []meta2.ReplicaGroup
	GetReplicaN(database string) (int, error)
}

type DownSampleManager interface {
	NewDownSamplePolicy(database, name string, info *meta2.DownSamplePolicyInfo) error
	DropDownSamplePolicy(database, name string, dropAll bool) error
	ShowDownSamplePolicies(database string) (models.Rows, error)
}

type ContinuousQueryManager interface {
	CreateContinuousQuery(database, name, query string) error
	ShowContinuousQueries() (models.Rows, error)
	DropContinuousQuery(name string, database string) error
	SendSql2MetaHeartbeat(host string) error
}

type SubscriptionManager interface {
	CreateSubscription(database, rp, name, mode string, destinations []string) error
	DropSubscription(database, rp, name string) error
	ShowSubscriptions() models.Rows
}

type SystemManager interface {
	SendSysCtrlToMeta(mod string, param map[string]string) (map[string]string, error)
	SendBackupToMeta(mod string, param map[string]string) (map[string]string, error)
	IsSQLiteEnabled() bool
	InsertFiles([]meta2.FileInfo) error
	IsMasterPt(uint32, string) bool
	ReplaceMergeShards(mergeShards meta2.MergeShards) error
	GetTimeRange(db, rp string, sShardId, eShardId uint64) (*meta2.ShardTimeRangeInfo, error)
	GetNoClearShardId(id uint64, db string, groupID uint64, policy string) (uint64, error)
	GetNoClearIndexId(id uint64, db string, policy string) (uint64, error)
}

type StreamManager interface {
	CreateStreamPolicy(info *meta2.StreamInfo) error
	CreateStreamMeasurement(info *meta2.StreamInfo, src, dest *influxql.Measurement, stmt *influxql.SelectStatement) error
	UpdateStreamMstSchema(database string, retentionPolicy string, mst string, stmt *influxql.SelectStatement) error
	GetStreamInfos() map[string]*meta2.StreamInfo
	ShowStreams(database string, showAll bool) (models.Rows, error)
	DropStream(name string) error
}

type UserManager interface {
	CreateUser(name, password string, admin, rwuser bool) (meta2.User, error)
	DropUser(name string) error
	SetAdminPrivilege(username string, admin bool) error
	SetPrivilege(username, database string, p originql.Privilege) error
	UpdateUser(name, password string) error
	UserPrivilege(username, database string) (*originql.Privilege, error)
	UserPrivileges(username string) (map[string]originql.Privilege, error)
	Users() []meta2.UserInfo
	Authenticate(username, password string) (u meta2.User, e error)
	AdminUserExists() bool
}

type ShardManager interface {
	ShowShardGroups() models.Rows
	ShardOwner(shardID uint64) (database, policy string, sgi *meta2.ShardGroupInfo)
	GetAliveShards(database string, sgi *meta2.ShardGroupInfo, isRead bool) []int
	DropShard(id uint64) error
	UpdateShardInfoTier(shardID uint64, tier uint64, dbName, rpName string) error
	UpdateShardDownSampleInfo(Ident *meta2.ShardIdentifier) error
	ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []meta2.ShardGroupInfo, err error)
}

type MetadataManager interface {
	GetMstInfoWithInRp(dbName, rpName string, dataTypes []int64) (*meta2.RpMeasurementsFieldsInfo, error)
	GetAllMst(dbName string) []string
	DatabaseOption(name string) (*obs.ObsOptions, error)
	DBPtView(database string) (meta2.DBPtInfos, error)
	GetNodePtsMap(database string) (map[uint64][]uint32, error)
	Databases() map[string]*meta2.DatabaseInfo
	Database(name string) (*meta2.DatabaseInfo, error)
	RetentionPolicy(database, name string) (*meta2.RetentionPolicyInfo, error)
	Measurement(database string, rpName string, mstName string) (*meta2.MeasurementInfo, error)
	GetMeasurements(m *influxql.Measurement) ([]*meta2.MeasurementInfo, error)
	TagKeys(database string) map[string]set.Set[string]
	FieldKeys(database string, ms influxql.Measurements) (map[string]map[string]int32, error)
	Schema(database string, retentionPolicy string, mst string) (fields map[string]int32, dimensions map[string]struct{}, err error)
	ShowShards(database string, rp string, mst string) models.Rows
	ShowRetentionPolicies(database string) (models.Rows, error)
	UpdateIndexInfoTier(indexID uint64, tier uint64, dbName, rpName string) error
}

type DatabaseManager interface {
	CreateDatabase(name string, enableTagArray bool, replicaN uint32, options *obs.ObsOptions) (*meta2.DatabaseInfo, error)
	CreateDatabaseWithRetentionPolicy(name string, spec *meta2.RetentionPolicySpec, shardKey *meta2.ShardKeyInfo, enableTagArray bool, replicaN uint32) (*meta2.DatabaseInfo, error)
	MarkDatabaseDelete(name string) error
	CreateRetentionPolicy(database string, spec *meta2.RetentionPolicySpec, makeDefault bool) (*meta2.RetentionPolicyInfo, error)
	MarkRetentionPolicyDelete(database, name string) error
	UpdateRetentionPolicy(database, name string, rpu *meta2.RetentionPolicyUpdate, makeDefault bool) error
}

type NodeManager interface {
	ShowCluster(nodeType string, ID uint64) (models.Rows, error)
	ShowClusterWithCondition(nodeType string, ID uint64) (models.Rows, error)
	CreateDataNode(storageNodeInfo *StorageNodeInfo, role string) (uint64, uint64, uint64, error)
	DeleteDataNode(id uint64) error
	DataNode(id uint64) (*meta2.DataNode, error)
	DataNodes() ([]meta2.DataNode, error)
	AliveReadNodes() ([]meta2.DataNode, error)
	DeleteMetaNode(id uint64) error
	MetaNodes() ([]meta2.NodeInfo, error)
}

type LoadCtx struct {
	LoadCh    chan *DBPTCtx
	ReportCtx sync.Pool
}

func (ctx *LoadCtx) GetReportCtx() *DBPTCtx {
	v := ctx.ReportCtx.Get()
	if v == nil {
		return &DBPTCtx{}
	}
	return v.(*DBPTCtx)
}

func (ctx *LoadCtx) PutReportCtx(dbPTCtx *DBPTCtx) {
	dbPTCtx.DBPTStat.DB = proto.String("")
	dbPTCtx.DBPTStat.PtID = proto.Uint32(0)
	dbPTCtx.putRpStat(&dbPTCtx.DBPTStat.RpStats)
	ctx.ReportCtx.Put(dbPTCtx)
}

type DBPTCtx struct {
	DBPTStat     *proto2.DBPtStatus
	RpStatusPool sync.Pool
}

func (r *DBPTCtx) GetDBPTStat() *proto2.DBPtStatus {
	if r.DBPTStat == nil {
		r.DBPTStat = &proto2.DBPtStatus{}
	}
	return r.DBPTStat
}

func (r *DBPTCtx) GetRpStat() []*proto2.RpShardStatus {
	v := r.RpStatusPool.Get()
	if v == nil {
		return []*proto2.RpShardStatus{}
	}
	return *(v.(*[]*proto2.RpShardStatus))
}

func (r *DBPTCtx) putRpStat(rss *[]*proto2.RpShardStatus) {
	for i := range *rss {
		(*rss)[i].RpName = proto.String("")
		(*rss)[i].ShardStats.ShardID = proto.Uint64(0)
		(*rss)[i].ShardStats.ShardSize = proto.Uint64(0)
		(*rss)[i].ShardStats.SeriesCount = proto.Int32(0)
		(*rss)[i].ShardStats.MaxTime = proto.Int64(0)
	}
	*rss = (*rss)[:0]
	r.RpStatusPool.Put(rss)
}

func (r *DBPTCtx) String() string {
	if r.DBPTStat == nil {
		return ""
	}

	return r.DBPTStat.String()
}

type SendRPCMessage interface {
	SendRPCMsg(currentServer int, msg *message.MetaMessage, callback transport.Callback) error
}

type RPCMessageSender struct{}

func (s *RPCMessageSender) SendRPCMsg(currentServer int, msg *message.MetaMessage, callback transport.Callback) error {
	trans, err := transport.NewMetaTransport(uint64(currentServer), spdy.MetaRequest, callback)
	if err != nil {
		return err
	}
	trans.SetTimeout(RPCReqTimeout)
	if err = trans.Send(msg); err != nil {
		return err
	}
	if err = trans.Wait(); err != nil {
		return err
	}
	refreshConnectedServer(currentServer)
	return nil
}

func refreshConnectedServer(currentServer int) {
	if currentServer != connectedServer {
		connectedServer = currentServer
	}
}

type Peers []string

func (peers Peers) Append(p ...string) Peers {
	peers = append(peers, p...)

	return peers.Unique()
}

func (peers Peers) Unique() Peers {
	distinct := map[string]struct{}{}
	for _, p := range peers {
		distinct[p] = struct{}{}
	}

	var u Peers
	for k := range distinct {
		u = append(u, k)
	}
	return u
}

func (peers Peers) Contains(peer string) bool {
	for _, p := range peers {
		if p == peer {
			return true
		}
	}
	return false
}

type errCommand struct {
	msg string
}

func (e errCommand) Error() string {
	return e.msg
}
